---
title: How to cache workflow step outputs
sidebarTitle: Cache workflow step outputs
---

The simplest way to cache the results of tasks within in a flow is to set `persist_result=True` on a task definition.

```python
from prefect import task, flow

@task(persist_result=True)
def add_one(x: int):
    return x + 1

@flow
def my_flow():
    add_one(1) # will not be cached
    add_one(1) # will be cached
    add_one(2) # will not be cached

if __name__ == "__main__":
    my_flow()
```

<Accordion title="Output">

```python
16:28:51.230 | INFO    | Flow run 'outgoing-cicada' - Beginning flow run 'outgoing-cicada' for flow 'my-flow'
16:28:51.257 | INFO    | Task run 'add_one-d85' - Finished in state Completed()
16:28:51.276 | INFO    | Task run 'add_one-eff' - Finished in state Cached(type=COMPLETED)
16:28:51.294 | INFO    | Task run 'add_one-d16' - Finished in state Completed()
16:28:51.311 | INFO    | Flow run 'outgoing-cicada' - Finished in state Completed()
```

</Accordion>

This will implicitly use the `DEFAULT` cache policy, which is a composite cache policy defined as:

```python
DEFAULT = INPUTS + TASK_SOURCE + RUN_ID
```

This means subsequent calls of a task with identical inputs from within the same parent run will return cached results without executing the body of the function.

<Note>
The `TASK_SOURCE` component of the `DEFAULT` cache policy helps avoid naming collisions between similar tasks that should not share a cache.
</Note>


### Cache based on inputs

To cache the result of a task based only on task inputs, set `cache_policy=INPUTS` in the task decorator:

```python
from prefect import task
from prefect.cache_policies import INPUTS

import time


@task(cache_policy=INPUTS)
def my_stateful_task(x: int):
    print('sleeping')
    time.sleep(10)
    return x + 1

my_stateful_task(x=1) # sleeps
my_stateful_task(x=1) # does not sleep
my_stateful_task(x=2) # sleeps
```

The above task will sleep the first time it is called with `x=1`, but will not sleep for any subsequent calls with the same input.

Prefect ships with several [cache policies](/v3/concepts/caching#cache-policies) that can be used to customize caching behavior.

### Cache based on a subset of inputs

To cache based on a subset of inputs, you can subtract kwargs from the `INPUTS` cache policy.

```python
from prefect import task
from prefect.cache_policies import INPUTS

import time


@task(cache_policy=INPUTS - 'debug')
def my_stateful_task(x: int, debug: bool = False):
    print('sleeping')
    time.sleep(10)
    return x + 1

my_stateful_task(x=1) # sleeps
my_stateful_task(x=1, debug=True) # does not sleep
my_stateful_task(x=1, debug=False) # does not sleep
```


### Cache with an expiration

To cache with an expiration, set the `cache_expiration` parameter on the task decorator.

```python
from prefect import task
from prefect.cache_policies import INPUTS

import time
from datetime import timedelta


@task(cache_policy=INPUTS, cache_expiration=timedelta(seconds=10))
def my_stateful_task(x: int):
    print('sleeping')
    time.sleep(10)
    return x + 1

my_stateful_task(x=1) # sleeps
my_stateful_task(x=1) # does not sleep
# ... 10 seconds pass ...
my_stateful_task(x=1) # sleeps again
```

### Ignore the cache

To ignore the cache regardless of the cache policy, set the `refresh_cache` parameter on the task decorator.

```python
import random

from prefect import task
from prefect.cache_policies import TASK_SOURCE


@task(cache_policy=TASK_SOURCE, refresh_cache=True)
def never_caching_task():
    return random.random()
```

To refresh the cache for all tasks, use the `PREFECT_TASKS_REFRESH_CACHE` setting. 
Setting `PREFECT_TASKS_REFRESH_CACHE=true` changes the default behavior of all tasks to refresh. 

If you have tasks that should not refresh when this setting is enabled, set `refresh_cache` to `False`. These tasks will never write to the cache. If a cache key exists it will be read, not updated. 
If a cache key does _not_ exist yet, these tasks can still write to the cache.

```python
import random

from prefect import task
from prefect.cache_policies import TASK_SOURCE


@task(cache_policy=TASK_SOURCE, refresh_cache=False)
def caching_task():
    return random.random()
```

### Cache on multiple criteria

Cache policies can be combined using the `+` operator.

```python
from prefect import task
from prefect.cache_policies import INPUTS, TASK_SOURCE

@task(cache_policy=INPUTS + TASK_SOURCE)
def my_task(x: int):
    return x + 1
```

The above task will use a cached result as long as the same inputs and task source are used.

### Cache in a distributed environment

By default Prefect stores results locally in `~/.prefect/storage/`. To share the cache across tasks running on different machines, provide a storage block to the `result_storage` parameter on the task decorator.

Here's an example with of a task that uses an S3 bucket to store cache records:

```python
from prefect import task
from prefect.cache_policies import INPUTS

from prefect_aws import AwsCredentials, S3Bucket

s3_bucket = S3Bucket(
    credentials=AwsCredentials(
        aws_access_key_id="my-access-key-id",
        aws_secret_access_key="my-secret-access-key",
    ),
    bucket_name="my-bucket",
)
# save the block to ensure it is available across machines
s3_bucket.save("my-cache-bucket")

@task(cache_policy=INPUTS, result_storage=s3_bucket)
def my_cached_task(x: int):
    return x + 42
```

<Note>
When using a storage block from a Prefect integration package, the package the storage block is imported from must be installed in all environments where the task will run.

For example, the `prefect_aws` package must be installed to use the `S3Bucket` storage block.
</Note>

### Further reading

For more advanced caching examples, see the [advanced caching how-to guide](/v3/advanced/caching).

For more information on Prefect's caching system, see the [caching concepts page](/v3/concepts/caching).

